package com.qsl.listener;

import com.qsl.support.Delivery;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @author: 青石路
 */
public class BlockingQueueConsumer {

    private static Logger logger = LoggerFactory.getLogger(BlockingQueueConsumer.class);

    private final String[] queues;
    private final int prefetchCount;
    private final ConnectionFactory connectionFactory;
    private final ConcurrentMap<String, InternalConsumer> consumers = new ConcurrentHashMap<>();
    private final BlockingQueue<Delivery> queue;
    private final boolean isAutoAck;

    private Channel channel;
    volatile Thread thread;

    public BlockingQueueConsumer(ConnectionFactory connectionFactory, int prefetchCount, boolean isAutoAck, String... queues) {
        this.connectionFactory = connectionFactory;
        this.prefetchCount = prefetchCount;
        this.queues = Arrays.copyOf(queues, queues.length);
        this.queue = new LinkedBlockingQueue<>(prefetchCount);
        this.isAutoAck = isAutoAck;
    }

    public Channel getChannel() {
        return channel;
    }

    public void start() throws Exception {
        this.thread = Thread.currentThread();
        this.channel = this.connectionFactory.newConnection().createChannel();
        setQosAndreateConsumers();
    }

    public synchronized void stop() {
        try {
            this.channel.close();
            this.thread.interrupt();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void setQosAndreateConsumers() throws Exception {
        this.channel.basicQos(this.prefetchCount);
        for (String queueName : this.queues) {
            consumeFromQueue(queueName);
        }
    }


    private void consumeFromQueue(String queue) throws IOException {
        InternalConsumer consumer = new InternalConsumer(this.channel, queue);
        String consumerTag = this.channel.basicConsume(queue, isAutoAck, consumer);
        if (consumerTag != null) {
            this.consumers.put(queue, consumer);
            if (logger.isDebugEnabled()) {
                logger.debug("Started on queue '" + queue + "' with tag " + consumerTag + ": " + this);
            }
        }
        else {
            logger.error("Null consumer tag received for queue " + queue);
        }
    }

    protected boolean hasDelivery() {
        return !this.queue.isEmpty();
    }

    public Delivery nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
        return this.queue.poll(timeout, TimeUnit.MILLISECONDS);
    }

    private final class InternalConsumer extends DefaultConsumer {

        private final String queueName;
        InternalConsumer(Channel channel, String queue) {
            super(channel);
            this.queueName = queue;
        }

        @Override
        public void handleCancel(String consumerTag) {
            if (logger.isWarnEnabled()) {
                logger.warn("Cancel received for " + consumerTag + " ("
                        + this.queueName
                        + "); " + BlockingQueueConsumer.this);
            }
            BlockingQueueConsumer.this.consumers.remove(this.queueName);
        }

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                   byte[] body) {
            try {
                BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body, this.queueName));
            }
            catch (@SuppressWarnings("unused") InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                BlockingQueueConsumer.logger.warn("Unexpected exception during delivery", e);
            }
        }

        @Override
        public String toString() {
            return "InternalConsumer{" + "queue='" + this.queueName + '\'' +
                    ", consumerTag='" + getConsumerTag() + '\'' +
                    '}';
        }
    }
}
